<!DOCTYPE html>
<html lang="en">
<head>
  <title>Streams - Vert.x</title>
  <meta charset="utf-8">
  <meta http-equiv="X-UA-Compatible" content="IE=edge">
  <meta content="width=device-width, initial-scale=1.0" name="viewport">
  <meta content="Vert.x is a tool-kit for building reactive applications on the JVM." name="description">
  <link href="http://vertx.io/stylesheets/docs.css" media="screen" rel="stylesheet">
  <link href="http://vertx.io/stylesheets/font-awesome.min.css" media="screen" rel="stylesheet">
  <link href="http://vertx.io/javascripts/styles/rainbow.min.css" media="screen" rel="stylesheet">
  <!-- IE 6-8 support of HTML 5 elements -->
  <!--[if lt IE 9]>
  <script src="http://static.jboss.org/theme/js/libs/html5/pre3.6/html5.min.js"></script>
  <![endif]-->

  <link rel="apple-touch-icon" sizes="57x57" href="http://vertx.io/assets/favicons/vertx-favicon-7/apple-touch-icon-57x57.png">
  <link rel="apple-touch-icon" sizes="60x60" href="http://vertx.io/assets/favicons/vertx-favicon-7/apple-touch-icon-60x60.png">
  <link rel="apple-touch-icon" sizes="72x72" href="http://vertx.io/assets/favicons/vertx-favicon-7/apple-touch-icon-72x72.png">
  <link rel="apple-touch-icon" sizes="76x76" href="http://vertx.io/assets/favicons/vertx-favicon-7/apple-touch-icon-76x76.png">
  <link rel="apple-touch-icon" sizes="114x114" href="http://vertx.io/assets/favicons/vertx-favicon-7/apple-touch-icon-114x114.png">
  <link rel="apple-touch-icon" sizes="120x120" href="http://vertx.io/assets/favicons/vertx-favicon-7/apple-touch-icon-120x120.png">
  <link rel="apple-touch-icon" sizes="144x144" href="http://vertx.io/assets/favicons/vertx-favicon-7/apple-touch-icon-144x144.png">
  <link rel="apple-touch-icon" sizes="152x152" href="http://vertx.io/assets/favicons/vertx-favicon-7/apple-touch-icon-152x152.png">
  <link rel="apple-touch-icon" sizes="180x180" href="http://vertx.io/assets/favicons/vertx-favicon-7/apple-touch-icon-180x180.png">
  <link rel="icon" type="image/png" href="http://vertx.io/assets/favicons/vertx-favicon-7/favicon-32x32.png" sizes="32x32">
  <link rel="icon" type="image/png" href="http://vertx.io/assets/favicons/vertx-favicon-7/android-chrome-192x192.png" sizes="192x192">
  <link rel="icon" type="image/png" href="http://vertx.io/assets/favicons/vertx-favicon-7/favicon-96x96.png" sizes="96x96">
  <link rel="icon" type="image/png" href="http://vertx.io/assets/favicons/vertx-favicon-7/favicon-16x16.png" sizes="16x16">
  <link rel="manifest" href="http://vertx.io/assets/favicons/vertx-favicon-7/manifest.json">
  <link rel="mask-icon" href="http://vertx.io/assets/favicons/vertx-favicon-7/safari-pinned-tab.svg" color="#5bbad5">
  <meta name="msapplication-TileColor" content="#7d3194">
  <meta name="msapplication-TileImage" content="http://vertx.io/assets/favicons/vertx-favicon-7/mstile-144x144.png">
  <meta name="theme-color" content="#ffffff">

  <link href="http://fonts.googleapis.com/css?family=Ubuntu:400,500,700,400italic" rel="stylesheet" type="text/css">
  <link rel="alternate" type="application/rss+xml" title="RSS"
     href="http://vertx.io/feed.xml">
  <script>
    (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
      (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
      m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
    })(window,document,'script','//www.google-analytics.com/analytics.js','ga');
    ga('create', 'UA-30144458-1', 'auto');
    ga('create', 'UA-71153120-1', 'auto', 'tracker');
    ga('send', 'pageview');
    ga('tracker.send', 'pageview');
  </script>
</head>
<body>

<a href="http://www.reactivemanifesto.org/" id="reactive-manifesto-banner">
  <img style="border: 0; position: fixed; right: 0; top:0; z-index: 9000"
    src="http://d379ifj7s9wntv.cloudfront.net/reactivemanifesto/images/ribbons/we-are-reactive-black-right.png">
</a>

<a id="skippy" class="sr-only sr-only-focusable" href="#content"><div class="container"><span class="skiplink-text">Skip to main content</span></div></a>

<header class="navbar navbar-default navbar-static-top" id="top" role="banner">
  <div class="container">
    <div class="navbar-header">
      <button class="navbar-toggle collapsed" type="button" data-toggle="collapse" data-target="#vertx-navbar-collapse">
        <span class="sr-only">Toggle navigation</span>
        <span class="icon-bar"></span>
        <span class="icon-bar"></span>
        <span class="icon-bar"></span>
      </button>
      <a href="http://vertx.io/" class="navbar-brand"><img alt="Brand" src="http://vertx.io/assets/logo-sm.png"></a>
    </div>
    <nav class="collapse navbar-collapse" id="vertx-navbar-collapse">
      <ul class="nav navbar-nav navbar-right">
        <li><a href="http://vertx.io/download/">Download</a></li>
        <li><a href="http://vertx.io/docs/">Documentation</a></li>
        <li><a href="https://github.com/vert-x3/wiki/wiki">Wiki</a></li>
        <li><a href="http://vertx.io/community/">Community</a></li>
        <li><a href="http://vertx.io/materials/">Materials</a></li>
        <li><a href="http://vertx.io/blog/">Blog</a></li>        
      </ul>
    </nav>
  </div>
</header>



  <div class="page-header" id="content">
    <div class="container">
      <div class="row">
        <div class="col-sm-12">
          <h1>Streams</h1>
          
        </div>
      </div>
    </div>
  </div>




<div id="content">
  <div class="container docs-content">
    <div class="row">
      <div class="col-sm-12 col-md-push-9 col-md-3 hidden-xs hidden-sm">
        <div id="sidebar" data-spy="affix">
          <ul class="sectlevel1">
<li><a href="#_streams">Streams</a>
<ul class="sectlevel2">
<li><a href="#_readstream">ReadStream</a></li>
<li><a href="#_writestream">WriteStream</a></li>
<li><a href="#_pump">Pump</a></li>
</ul>
</li>
</ul>
        </div>
      </div>
      <div class="col-sm-12 col-md-pull-3 col-md-9">
        <div class="toc hidden-md hidden-lg">
          <h2>Table of Contents</h2>
          <ul class="sectlevel1">
<li><a href="#_streams">Streams</a>
<ul class="sectlevel2">
<li><a href="#_readstream">ReadStream</a></li>
<li><a href="#_writestream">WriteStream</a></li>
<li><a href="#_pump">Pump</a></li>
</ul>
</li>
</ul>
        </div>
        <div class="sect1">
<h2 id="_streams">Streams</h2>
<div class="sectionbody">
<div class="paragraph">
<p>There are several objects in Vert.x that allow items to be read from and written.</p>
</div>
<div class="paragraph">
<p>In previous versions the streams.adoc package was manipulating <code><a href="../../ceylondoc/vertx-core//buffer/Buffer.type.html">Buffer</a></code>
objects exclusively. From now, streams are not coupled to buffers anymore and they work with any kind of objects.</p>
</div>
<div class="paragraph">
<p>In Vert.x, write calls return immediately, and writes are queued internally.</p>
</div>
<div class="paragraph">
<p>It&#8217;s not hard to see that if you write to an object faster than it can actually write the data to
its underlying resource, then the write queue can grow unbounded - eventually resulting in
memory exhaustion.</p>
</div>
<div class="paragraph">
<p>To solve this problem a simple flow control (<em>back-pressure</em>) capability is provided by some objects in the Vert.x API.</p>
</div>
<div class="paragraph">
<p>Any flow control aware object that can be <em>written-to</em> implements <code><a href="../../ceylondoc/vertx-core//streams/WriteStream.type.html">WriteStream</a></code>,
while any flow control object that can be <em>read-from</em> is said to implement <code><a href="../../ceylondoc/vertx-core//streams/ReadStream.type.html">ReadStream</a></code>.</p>
</div>
<div class="paragraph">
<p>Let&#8217;s take an example where we want to read from a <code>ReadStream</code> then write the data to a <code>WriteStream</code>.</p>
</div>
<div class="paragraph">
<p>A very simple example would be reading from a <code><a href="../../ceylondoc/vertx-core//net/NetSocket.type.html">NetSocket</a></code> then writing back to the
same <code>NetSocket</code> - since <code>NetSocket</code> implements both <code>ReadStream</code> and <code>WriteStream</code>. Note that this works
between any <code>ReadStream</code> and <code>WriteStream</code> compliant object, including HTTP requests, HTTP responses,
async files I/O, WebSockets, etc.</p>
</div>
<div class="paragraph">
<p>A naive way to do this would be to directly take the data that has been read and immediately write it
to the <code>NetSocket</code>:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="prettyprint highlight"><code class="language-ceylon" data-lang="ceylon">value server = vertx.createNetServer(NetServerOptions {
  port = 1234;
  host = "localhost";
});
server.connectHandler((NetSocket sock) {
  sock.handler((Buffer buffer) {
    // Write the data straight back
    sock.write(buffer);
  });
}).listen();</code></pre>
</div>
</div>
<div class="paragraph">
<p>There is a problem with the example above: if data is read from the socket faster than it can be
written back to the socket, it will build up in the write queue of the <code>NetSocket</code>, eventually
running out of RAM. This might happen, for example if the client at the other end of the socket
wasn&#8217;t reading fast enough, effectively putting back-pressure on the connection.</p>
</div>
<div class="paragraph">
<p>Since <code>NetSocket</code> implements <code>WriteStream</code>, we can check if the <code>WriteStream</code> is full before
writing to it:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="prettyprint highlight"><code class="language-ceylon" data-lang="ceylon">value server = vertx.createNetServer(NetServerOptions {
  port = 1234;
  host = "localhost";
});
server.connectHandler((NetSocket sock) {
  sock.handler((Buffer buffer) {
    if (!sock.writeQueueFull()) {
      sock.write(buffer);
    };
  });

}).listen();</code></pre>
</div>
</div>
<div class="paragraph">
<p>This example won&#8217;t run out of RAM but we&#8217;ll end up losing data if the write queue gets full. What we
really want to do is pause the <code>NetSocket</code> when the write queue is full:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="prettyprint highlight"><code class="language-ceylon" data-lang="ceylon">value server = vertx.createNetServer(NetServerOptions {
  port = 1234;
  host = "localhost";
});
server.connectHandler((NetSocket sock) {
  sock.handler((Buffer buffer) {
    sock.write(buffer);
    if (sock.writeQueueFull()) {
      sock.pause();
    };
  });
}).listen();</code></pre>
</div>
</div>
<div class="paragraph">
<p>We&#8217;re almost there, but not quite. The <code>NetSocket</code> now gets paused when the file is full, but we also need to unpause
it when the write queue has processed its backlog:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="prettyprint highlight"><code class="language-ceylon" data-lang="ceylon">value server = vertx.createNetServer(NetServerOptions {
  port = 1234;
  host = "localhost";
});
server.connectHandler((NetSocket sock) {
  sock.handler((Buffer buffer) {
    sock.write(buffer);
    if (sock.writeQueueFull()) {
      sock.pause();
      sock.drainHandler(() {
        sock.resume();
      });
    };
  });
}).listen();</code></pre>
</div>
</div>
<div class="dlist">
<dl>
<dt class="hdlist1">And there we have it. The `link:../../ceylondoc/vertx-core//streams/WriteStream.type.html#drainHandler((@io.vertx.codegen.annotations.Nullable </dt>
<dd>
<p>io.vertx.core.Handler))[drainHandler]` event handler will
get called when the write queue is ready to accept more data, this resumes the <code>NetSocket</code> that
allows more data to be read.</p>
</dd>
</dl>
</div>
<div class="paragraph">
<p>Wanting to do this is quite common while writing Vert.x applications, so we provide a helper class
called <code><a href="../../ceylondoc/vertx-core//streams/Pump.type.html">Pump</a></code> that does all of this hard work for you.
You just feed it the <code>ReadStream</code> plus the <code>WriteStream</code> then start it:</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="prettyprint highlight"><code class="language-ceylon" data-lang="ceylon">import io.vertx.ceylon.core.streams { pump }

...

value server = vertx.createNetServer(NetServerOptions {
  port = 1234;
  host = "localhost";
});
server.connectHandler((NetSocket sock) {
  pump.pump(sock, sock).start();
}).listen();</code></pre>
</div>
</div>
<div class="paragraph">
<p>This does exactly the same thing as the more verbose example.</p>
</div>
<div class="paragraph">
<p>Let&#8217;s now look at the methods on <code>ReadStream</code> and <code>WriteStream</code> in more detail:</p>
</div>
<div class="sect2">
<h3 id="_readstream">ReadStream</h3>
<div class="paragraph">
<p><code>ReadStream</code> is implemented by <code><a href="../../ceylondoc/vertx-core//http/HttpClientResponse.type.html">HttpClientResponse</a></code>, <code><a href="../../ceylondoc/vertx-core//datagram/DatagramSocket.type.html">DatagramSocket</a></code>,
<code><a href="../../ceylondoc/vertx-core//http/HttpClientRequest.type.html">HttpClientRequest</a></code>, <code><a href="../../ceylondoc/vertx-core//http/HttpServerFileUpload.type.html">HttpServerFileUpload</a></code>,
<code><a href="../../ceylondoc/vertx-core//http/HttpServerRequest.type.html">HttpServerRequest</a></code>, <code><a href="../../ceylondoc/vertx-core//http/HttpServerRequestStream.type.html">HttpServerRequestStream</a></code>,
<code><a href="../../ceylondoc/vertx-core//eventbus/MessageConsumer.type.html">MessageConsumer</a></code>, <code><a href="../../ceylondoc/vertx-core//net/NetSocket.type.html">NetSocket</a></code>, <code><a href="../../ceylondoc/vertx-core//net/NetSocketStream.type.html">NetSocketStream</a></code>,
<code><a href="../../ceylondoc/vertx-core//http/WebSocket.type.html">WebSocket</a></code>, <code><a href="../../ceylondoc/vertx-core//http/WebSocketStream.type.html">WebSocketStream</a></code>, <code><a href="../../ceylondoc/vertx-core//TimeoutStream.type.html">TimeoutStream</a></code>,
<code><a href="../../ceylondoc/vertx-core//file/AsyncFile.type.html">AsyncFile</a></code>.</p>
</div>
<div class="paragraph">
<p>Functions:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><code>link:../../ceylondoc/vertx-core//streams/ReadStream.type.html#handler@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler[handler]</code>:
set a handler which will receive items from the ReadStream.</p>
</li>
<li>
<p><code><a href="../../ceylondoc/vertx-core//streams/ReadStream.type.html#pause()">pause</a></code>:
pause the handler. When paused no items will be received in the handler.</p>
</li>
<li>
<p><code><a href="../../ceylondoc/vertx-core//streams/ReadStream.type.html#resume()">resume</a></code>:
resume the handler. The handler will be called if any item arrives.</p>
</li>
<li>
<p><code><a href="../../ceylondoc/vertx-core//streams/ReadStream.type.html#exceptionHandler(io.vertx.core.Handler)">exceptionHandler</a></code>:
Will be called if an exception occurs on the ReadStream.</p>
</li>
<li>
<p><code>link:../../ceylondoc/vertx-core//streams/ReadStream.type.html#endHandler@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler[endHandler]</code>:
Will be called when end of stream is reached. This might be when EOF is reached if the ReadStream represents a file,
or when end of request is reached if it&#8217;s an HTTP request, or when the connection is closed if it&#8217;s a TCP socket.</p>
</li>
</ul>
</div>
</div>
<div class="sect2">
<h3 id="_writestream">WriteStream</h3>
<div class="paragraph">
<p><code>WriteStream</code> is implemented by <code><a href="../../ceylondoc/vertx-core//http/HttpClientRequest.type.html">HttpClientRequest</a></code>, <code><a href="../../ceylondoc/vertx-core//http/HttpServerResponse.type.html">HttpServerResponse</a></code>
<code><a href="../../ceylondoc/vertx-core//http/WebSocket.type.html">WebSocket</a></code>, <code><a href="../../ceylondoc/vertx-core//net/NetSocket.type.html">NetSocket</a></code>, <code><a href="../../ceylondoc/vertx-core//file/AsyncFile.type.html">AsyncFile</a></code>,
<code><a href="../../ceylondoc/vertx-core//datagram/PacketWritestream.type.html">PacketWritestream</a></code> and <code><a href="../../ceylondoc/vertx-core//eventbus/MessageProducer.type.html">MessageProducer</a></code></p>
</div>
<div class="paragraph">
<p>Functions:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><code><a href="../../ceylondoc/vertx-core//streams/WriteStream.type.html#write(java.lang.Object)">write</a></code>:
write an object to the WriteStream. This method will never block. Writes are queued internally and asynchronously
written to the underlying resource.</p>
</li>
<li>
<p><code><a href="../../ceylondoc/vertx-core//streams/WriteStream.type.html#setWriteQueueMaxSize(int)">setWriteQueueMaxSize</a></code>:
set the number of object at which the write queue is considered <em>full</em>, and the method <code><a href="../../ceylondoc/vertx-core//streams/WriteStream.type.html#writeQueueFull()">writeQueueFull</a></code>
returns <code>true</code>. Note that, when the write queue is considered full, if write is called the data will still be accepted
and queued. The actual number depends on the stream implementation, for <code><a href="../../ceylondoc/vertx-core//buffer/Buffer.type.html">Buffer</a></code> the size
represents the actual number of bytes written and not the number of buffers.</p>
</li>
<li>
<p><code><a href="../../ceylondoc/vertx-core//streams/WriteStream.type.html#writeQueueFull()">writeQueueFull</a></code>:
returns <code>true</code> if the write queue is considered full.</p>
</li>
<li>
<p><code><a href="../../ceylondoc/vertx-core//streams/WriteStream.type.html#exceptionHandler(io.vertx.core.Handler)">exceptionHandler</a></code>:
Will be called if an exception occurs on the <code>WriteStream</code>.</p>
</li>
<li>
<p><code>link:../../ceylondoc/vertx-core//streams/WriteStream.type.html#drainHandler@io.vertx.codegen.annotations.Nullable :: io.vertx.core.Handler[drainHandler]</code>:
The handler will be called if the <code>WriteStream</code> is considered no longer full.</p>
</li>
</ul>
</div>
</div>
<div class="sect2">
<h3 id="_pump">Pump</h3>
<div class="paragraph">
<p>Instances of Pump have the following methods:</p>
</div>
<div class="ulist">
<ul>
<li>
<p><code><a href="../../ceylondoc/vertx-core//streams/Pump.type.html#start()">start</a></code>:
Start the pump.</p>
</li>
<li>
<p><code><a href="../../ceylondoc/vertx-core//streams/Pump.type.html#stop()">stop</a></code>:
Stops the pump. When the pump starts it is in stopped mode.</p>
</li>
<li>
<p><code><a href="../../ceylondoc/vertx-core//streams/Pump.type.html#setWriteQueueMaxSize(int)">setWriteQueueMaxSize</a></code>:
This has the same meaning as <code><a href="../../ceylondoc/vertx-core//streams/WriteStream.type.html#setWriteQueueMaxSize(int)">setWriteQueueMaxSize</a></code> on the <code>WriteStream</code>.</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>A pump can be started and stopped multiple times.</p>
</div>
<div class="paragraph">
<p>When a pump is first created it is <em>not</em> started. You need to call the <code>start()</code> method to start it.</p>
</div>
</div>
</div>
</div>

        

        
          <div id="footer">
            <div id="footer-text">
              
                Last updated 2016-09-12 08:38:04 CEST
              
              
            </div>
          </div>
        
      </div>
    </div>
  </div>
</div>

<footer>
  <div class="container">
    <div class="row">
      <div class="col-xs-6 col-sm-3 col-md-3 col-lg-2">
        <h2>Vert.x</h2>
        <ul class="list-unstyled">
          <li><a href="http://vertx.io/">Home</a></li>
          <li><a href="http://vertx.io/download/">Download</a></li>
          <li><a href="http://vertx.io/docs/">Documentation</a></li>
          <li><a href="https://github.com/vert-x3/wiki/wiki">Wiki</a></li>
          <li><a href="http://vertx.io/blog/">Blog</a></li>
          <li><a href="http://vertx.io/vertx2/" class="vertx-2-link">Vert.x 2</a></li>
        </ul>
      </div>
      <div class="col-xs-6 col-sm-3 col-md-3 col-lg-2">
        <h2>Community</h2>
        <ul class="list-unstyled">
          <li><a href="http://vertx.io/community/">Help &amp; Contributors</a></li>
          <li><a href="http://vertx.io/materials/">Learning materials</a></li>
          <li><a href="https://groups.google.com/forum/?fromgroups#!forum/vertx">User Group</a></li>
          <li><a href="https://groups.google.com/forum/?fromgroups#!forum/vertx-dev">Developer Group</a></li>
        </ul>
      </div>

      <div class="col-xs-12 col-sm-6 col-lg-offset-2 col-md-6 copyright">
        <p>Vert.x is open source and dual licensed under the <a href="https://www.eclipse.org/org/documents/epl-v10.php">Eclipse Public License 1.0</a> and <a href="https://www.apache.org/licenses/LICENSE-2.0.html">Apache License 2.0</a>.</p>
        <p>This website is licensed under the <a href="http://creativecommons.org/licenses/by-sa/3.0/">CC BY-SA 3.0 License</a>.<br>
        Design by <a href="http://www.michel-kraemer.com">Michel Kr&auml;mer</a>. <a href="http://www.entypo.com">Entypo pictograms</a> by Daniel Bruce.</p>
        <div class="row">
          <div class="col-xs-12 col-lg-offset-1 col-md-5">
            <a href="http://eclipse.org">
            <img class="logo eclipse-logo" src="http://vertx.io/assets/eclipse_logo_grey_small.png" width="204" height="48">
            </a>
          </div>
          <div class="col-xs-12 col-md-offset-2 col-lg-offset-0 col-md-5">
            <a href="http://cloudbees.com">
            <img class="logo cloudbees-logo" src="http://vertx.io/assets/Button-Built-on-CB-1-grey.png" width="180" height="48">
           </a>
          </div>
          <div class="col-xs-12 col-md-offset-2 col-lg-offset-1 col-md-5 jprofiler">
            <a href="http://www.ej-technologies.com/products/jprofiler/overview.html"
            style="text-decoration:none">
            <img class="logo jprofiler-logo" src="http://vertx.io/assets/jprofiler-logo.png" width="48" height="48"><span class="jprofiler-logo">&nbsp; JPROFILER</span>
            </a>
          </div>
        </div>
      </div>
    </div>
  </div>
</footer>

<script src="http://static.jboss.org/theme/js/libs/jquery/jquery-1.9.1.min.js"></script>
<script src="http://vertx.io/javascripts/bootstrap.min.js"></script>
<script src="http://vertx.io/javascripts/highlight.pack.js"></script>
<script>hljs.initHighlightingOnLoad();</script>



<script src="http://vertx.io/javascripts/sidebar.js"></script>


</body>
</html>
